feat: add optional Langfuse observability for Claude Agent SDK calls#1194
feat: add optional Langfuse observability for Claude Agent SDK calls#1194lraphael wants to merge 6 commits intocoleam00:devfrom
Conversation
📝 WalkthroughWalkthroughAdds an optional Langfuse/OpenTelemetry observability module, lifecycle hooks (init/shutdown) in CLI/server, AsyncLocalStorage-based observability context, wraps key entrypoints with that context, and instruments provider streaming pipelines via a traceQuery async-generator wrapper. Changes
Sequence DiagramsequenceDiagram
participant App as Application
participant Langfuse as Langfuse/OTel
participant Context as Observability Context
participant Handler as Message/Workflow Handler
participant Provider as Provider Stream
App->>Langfuse: initLangfuse()
activate Langfuse
Langfuse->>Langfuse: configure OTel & start SDK
Langfuse-->>App: initialized
deactivate Langfuse
App->>Handler: incoming request / start workflow
activate Handler
Handler->>Context: withObservabilityContext(attrs)
activate Context
Context->>Handler: run handler inside context
Handler->>Provider: traceQuery(prompt, model, generator)
activate Provider
Provider->>Langfuse: emit spans/observations (if enabled)
Langfuse-->>Provider: ack
Provider-->>Handler: streamed chunks (tools, assistant, result)
deactivate Provider
Context-->>Handler: return result
deactivate Context
Handler-->>App: response
deactivate Handler
App->>Langfuse: shutdownLangfuse()
activate Langfuse
Langfuse->>Langfuse: flush spans & shutdown SDK
Langfuse-->>App: shutdown complete
deactivate Langfuse
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/providers/src/observability.ts`:
- Around line 87-117: The init function currently assigns instrumentedQueryFn
before completing setup and on any error returns false while leaving module
state stale; update the failure paths (catch block and any early-return branches
around otelSdk initialization) to reset module-level state by clearing
instrumentedQueryFn, setting otelSdk to null/undefined and setting initialized
to false before returning, so callers cannot receive a partially-initialized
instrumentedQueryFn when init fails; reference the symbols instrumentedQueryFn,
otelSdk, and initialized when applying the resets.
In `@packages/server/src/index.ts`:
- Around line 621-624: shutdownLangfuse() is currently fired and not awaited, so
span flush can be interrupted by process exit; update the teardown flow to await
shutdownLangfuse() (use await shutdownLangfuse() inside the async
shutdown/teardown function or return the promise chain) and wrap it in try/catch
to log errors via getLog().error (preserve the 'langfuse.shutdown_failed'
context) before calling process.exit so the shutdown completes before the
process terminates.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: fc7c09e2-374d-415a-9ad6-ebf2626c9b30
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
.env.examplepackages/cli/src/cli.tspackages/core/src/orchestrator/orchestrator-agent.tspackages/providers/package.jsonpackages/providers/src/claude/provider.tspackages/providers/src/index.tspackages/providers/src/observability.test.tspackages/providers/src/observability.tspackages/server/src/index.tspackages/workflows/src/dag-executor.ts
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
packages/providers/src/codex/provider.ts (1)
588-590: Pass the resolved model into tracing, not only request override.
requestOptions?.modelcan be undefined while Codex actually usesthreadOptions.model. Passing the resolved value improves trace accuracy.💡 Proposed fix
yield* traceQuery( prompt, - requestOptions?.model, + threadOptions.model ?? requestOptions?.model, streamCodexEvents(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/providers/src/codex/provider.ts` around lines 588 - 590, The trace call is using requestOptions?.model which may be undefined; update the yield* traceQuery invocation in provider.ts to pass the resolved model value (the final model variable used by Codex, e.g., threadOptions.model or the computed model variable) instead of requestOptions?.model so tracing receives the actual model in use; locate the yield* traceQuery(...) call and replace the second argument with the resolved model identifier used earlier in the function (the one selected from threadOptions.model / requestOptions fallback).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/providers/src/observability.ts`:
- Around line 166-173: The code currently uses a single mutable activeToolCall
slot (activeToolCall) which misattributes overlapping or concurrent tool runs;
replace this with a map keyed by toolCallId (e.g., toolCallMap: Record<string, {
name:string; input?:unknown; startTime:number; chunks: string[] }>) and update
all handlers that append outputs (where MessageChunk and tool call lifecycle are
processed) to locate the tool entry by MessageChunk.toolCallId, pushing chunk
text into that entry and computing duration on close; also implement a small
fallback queue for chunks lacking toolCallId (to attach when the next toolCallId
is opened) so existing behavior is preserved, and update uses of toolCalls,
activeToolCall, and any logic in the blocks around the previous activeToolCall
handling (including the sections referenced around 184-206) to read from the map
and finalize by moving entries into the toolCalls array.
- Line 240: Remove the hardcoded Claude-specific tags by replacing the literal
'claude-agent-query' and 'claude-sonnet-4-20250514' in observability.ts with
provider-neutral defaults (e.g., 'ai-agent-query' or just 'agent-query') and
make the model tag optional: only append a model-specific tag when a model name
is provided (use a conditional that includes modelName if present). Locate the
occurrences of the exact string literals 'claude-agent-query' and
'claude-sonnet-4-20250514' and update the array/metadata construction so traces
are provider-neutral and include model info only when available.
- Around line 174-223: The current loop buffers all MessageChunk objects into
chunks before emitting, which breaks streaming; instead, inside the for-await
loop in the generator consumer (where chunks, streamError, activeToolCall,
toolCalls, textParts, usage, cost, numTurns, sessionId are handled) immediately
forward/yield each processed chunk to the downstream consumer as soon as it's
handled (after updating textParts/toolCalls/etc.), while still appending to
chunks for post-stream observability; move any Langfuse/logging/trace submission
logic (currently run after the loop) to execute only once after the stream
finishes or in the catch/finally to preserve streaming semantics; apply the same
change pattern to the similar block around lines 285-293.
---
Nitpick comments:
In `@packages/providers/src/codex/provider.ts`:
- Around line 588-590: The trace call is using requestOptions?.model which may
be undefined; update the yield* traceQuery invocation in provider.ts to pass the
resolved model value (the final model variable used by Codex, e.g.,
threadOptions.model or the computed model variable) instead of
requestOptions?.model so tracing receives the actual model in use; locate the
yield* traceQuery(...) call and replace the second argument with the resolved
model identifier used earlier in the function (the one selected from
threadOptions.model / requestOptions fallback).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 39361bbc-f777-4d98-a5b6-812fa57cdcae
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
packages/providers/package.jsonpackages/providers/src/claude/provider.tspackages/providers/src/codex/provider.tspackages/providers/src/observability.test.tspackages/providers/src/observability.ts
✅ Files skipped from review due to trivial changes (1)
- packages/providers/src/observability.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/providers/package.json
- packages/providers/src/claude/provider.ts
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
Auto-instrument all Claude Agent SDK query() calls via OpenTelemetry when LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY env vars are set. Zero overhead when not configured. - Add observability module with AsyncLocalStorage context propagation - Instrument query() via @arizeai/openinference-instrumentation-claude-agent-sdk - Initialize at server and CLI entry points, flush on shutdown - Propagate conversationId, platformType, workflowName to traces - Widen withFirstMessageTimeout to accept AsyncIterable (instrumented query returns AsyncIterable, not AsyncGenerator)
…tdown - Reset instrumentedQueryFn/otelSdk/initialized in catch block so getQuery() falls back to the original SDK after a partial init failure - Chain shutdownLangfuse() before persistence.flushAll() in the server shutdown handler so spans are flushed before process.exit() - Allow shutdownLangfuse() to always reset state even if otelSdk is null
…roviders The subprocess-based Claude Agent SDK can't be auto-instrumented for input/output capture. Replace with manual traceQuery() wrapper that: - Captures prompt as input and collected response as output - Records token usage, cost, and num_turns from result events - Creates child "tool" spans for each tool call with input/output - Works for both Claude and Codex providers - Remove @arizeai/openinference-instrumentation-claude-agent-sdk dep Also adds unit tests for tool call tracking, empty generators, and cost/usage passthrough (19 tests total).
…tral names - Yield chunks immediately instead of buffering (preserves streaming semantics) - Correlate tool calls by toolCallId via Map instead of single active slot - Rename trace from 'claude-agent-query' to 'agent-query' (provider-neutral) - Make model optional instead of defaulting to claude-sonnet
Post-yield code in async generators is not reliably executed through multiple yield* delegation layers in Bun. Revert to buffered approach: collect all chunks, create Langfuse trace, then yield chunks. Also adds explicit forceFlush() before OTel SDK shutdown and a regression test simulating the yield* delegation chain.
10392e6 to
1dc9f68
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (3)
packages/providers/src/observability.ts (1)
171-173: Streaming latency tradeoff acknowledged but worth documenting in public API.The current implementation buffers all chunks before yielding to work around Bun's unreliable post-yield execution in
yield*chains. While the internal comment explains this (lines 171-173), consumers oftraceQuerymay not expect this latency hit when Langfuse is enabled.Consider adding a note to the JSDoc (lines 146-156) indicating that when Langfuse is initialized, chunks are buffered until stream completion before being yielded.
Also applies to: 300-303
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/providers/src/observability.ts` around lines 171 - 173, Update the JSDoc for the traceQuery async generator to document the buffering behavior: note that when Langfuse (or trace collection) is enabled the implementation buffers all chunks and only yields them after the stream completes (to work around Bun's unreliable post-yield execution), which may increase streaming latency; add the same note for the related streaming helper referenced around the other comment block so consumers are aware of the tradeoff and can opt out if low-latency streaming is required.packages/providers/src/observability.test.ts (1)
266-284: Test expectation may not match implementation behavior.The test expects
chunks.lengthto be 1 with comment "Chunk was yielded immediately (streaming preserved), before error" (line 281-282). However, the implementation buffers all chunks and yields them after stream completion (lines 300-303 in observability.ts).The test passes because:
- One chunk is yielded by the generator
- Error is thrown and captured
- All buffered chunks (1) are yielded
- Error is rethrown
The comment is misleading — chunks are not yielded immediately but buffered. Consider updating the comment:
- // Chunk was yielded immediately (streaming preserved), before error + // Chunk was buffered then yielded after stream error was captured expect(chunks).toHaveLength(1);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/providers/src/observability.test.ts` around lines 266 - 284, The test's inline comment about "Chunk was yielded immediately (streaming preserved), before error" is misleading because traceQuery buffers chunks and yields them after stream completion; update the comment in the test (around the assertions after the try/catch) to state that the single chunk was buffered by traceQuery and yielded only after the generator errored, or otherwise adjust the expectation text to reflect buffering behavior of traceQuery.packages/workflows/src/dag-executor.ts (1)
2150-2157: Type mismatch between outer and inner function signatures.
executeDagWorkflowacceptsworkflowProvider: string(line 2117), butexecuteDagWorkflowInnerdeclaresworkflowProvider: 'claude' | 'codex'(line 2157). This creates an implicit unsafe type assertion when the outer function passes the provider to the inner function.If the intent is to enforce valid providers, consider either:
- Narrowing the outer function's type to
'claude' | 'codex'- Keeping the inner function's type as
stringto matchOption 1: Narrow outer function type
export async function executeDagWorkflow( deps: WorkflowDeps, platform: IWorkflowPlatform, conversationId: string, cwd: string, workflow: { name: string; nodes: readonly DagNode[] } & WorkflowLevelOptions, workflowRun: WorkflowRun, - workflowProvider: string, + workflowProvider: 'claude' | 'codex', workflowModel: string | undefined,Option 2: Keep inner function consistent
async function executeDagWorkflowInner( deps: WorkflowDeps, platform: IWorkflowPlatform, conversationId: string, cwd: string, workflow: { name: string; nodes: readonly DagNode[] } & WorkflowLevelOptions, workflowRun: WorkflowRun, - workflowProvider: 'claude' | 'codex', + workflowProvider: string, workflowModel: string | undefined,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/workflows/src/dag-executor.ts` around lines 2150 - 2157, There is a type mismatch between executeDagWorkflow (which currently takes workflowProvider: string) and executeDagWorkflowInner (which expects workflowProvider: 'claude' | 'codex'); fix this by making the types consistent—preferably narrow the outer executeDagWorkflow signature to workflowProvider: 'claude' | 'codex' so the inner call is type-safe (update the executeDagWorkflow parameter type and any callers that pass other strings), or alternatively change executeDagWorkflowInner to accept workflowProvider: string if widening is intended; the symbols to edit are executeDagWorkflow and executeDagWorkflowInner and ensure all call sites and type annotations reflect the chosen union or string type.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/providers/src/observability.test.ts`:
- Around line 266-284: The test's inline comment about "Chunk was yielded
immediately (streaming preserved), before error" is misleading because
traceQuery buffers chunks and yields them after stream completion; update the
comment in the test (around the assertions after the try/catch) to state that
the single chunk was buffered by traceQuery and yielded only after the generator
errored, or otherwise adjust the expectation text to reflect buffering behavior
of traceQuery.
In `@packages/providers/src/observability.ts`:
- Around line 171-173: Update the JSDoc for the traceQuery async generator to
document the buffering behavior: note that when Langfuse (or trace collection)
is enabled the implementation buffers all chunks and only yields them after the
stream completes (to work around Bun's unreliable post-yield execution), which
may increase streaming latency; add the same note for the related streaming
helper referenced around the other comment block so consumers are aware of the
tradeoff and can opt out if low-latency streaming is required.
In `@packages/workflows/src/dag-executor.ts`:
- Around line 2150-2157: There is a type mismatch between executeDagWorkflow
(which currently takes workflowProvider: string) and executeDagWorkflowInner
(which expects workflowProvider: 'claude' | 'codex'); fix this by making the
types consistent—preferably narrow the outer executeDagWorkflow signature to
workflowProvider: 'claude' | 'codex' so the inner call is type-safe (update the
executeDagWorkflow parameter type and any callers that pass other strings), or
alternatively change executeDagWorkflowInner to accept workflowProvider: string
if widening is intended; the symbols to edit are executeDagWorkflow and
executeDagWorkflowInner and ensure all call sites and type annotations reflect
the chosen union or string type.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 23fdb38e-ab64-4f0e-bfa7-4b3642de104b
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
.env.examplepackages/cli/src/cli.tspackages/core/src/orchestrator/orchestrator-agent.tspackages/providers/package.jsonpackages/providers/src/claude/provider.tspackages/providers/src/codex/provider.tspackages/providers/src/index.tspackages/providers/src/observability.test.tspackages/providers/src/observability.tspackages/server/src/index.tspackages/workflows/src/dag-executor.ts
✅ Files skipped from review due to trivial changes (2)
- .env.example
- packages/providers/src/index.ts
🚧 Files skipped from review as they are similar to previous changes (5)
- packages/cli/src/cli.ts
- packages/providers/package.json
- packages/providers/src/claude/provider.ts
- packages/server/src/index.ts
- packages/core/src/orchestrator/orchestrator-agent.ts
|
Thanks for the time and contribution, langfuse is not something we are looking to add now. closing |
Why Langfuse?
Archon orchestrates AI coding agents across multiple platforms (Slack, Telegram, GitHub, Web, CLI) and runs multi-step workflows — but until now there was no visibility into what these agents actually do, how much they cost, or how their prompts perform.
Cost monitoring: Every
query()call to Claude or Codex costs money. Workflows can chain multiple nodes, each spawning its own AI session with tools. Without tracing, costs are invisible — you only see the bill at the end of the month. Langfuse tracks token usage (input, output, cache) per call, letting you identify expensive workflows, optimize model choices (e.g. switching a routing step from Opus to Haiku), and set budgets with confidence.Prompt optimization: Langfuse captures the full input/output of every AI call, including tool calls and their results. This makes it possible to review how the orchestrator routes messages, how workflow nodes interact, and where prompts could be tightened. You can compare prompt versions, spot unnecessary tool calls, and identify patterns that lead to poor responses — all from a dashboard instead of digging through logs.
Operational insight: With conversationId, platformType, and workflowName attached to every trace, you can filter by platform (e.g. "why are Slack conversations more expensive than Web?"), by workflow (e.g. "the review workflow uses 3x more tokens than expected"), or by conversation to debug a specific interaction end-to-end.
Langfuse is open-source and can be self-hosted — no data leaves your infrastructure if you choose.
Summary
Changes
Configuration
```bash
Add to .env or ~/.archon/.env (all optional — disabled when not set)
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_BASE_URL=https://cloud.langfuse.com # or self-hosted URL
```
Test plan
Summary by CodeRabbit
New Features
Configuration
Tests
Chores